package net.quanter.shield.mq.rocketmq.producer;

import com.alibaba.fastjson2.JSON;
import com.aliyun.openservices.ons.api.*;
import lombok.extern.slf4j.Slf4j;
import net.quanter.shield.common.dto.result.ResultDTO;
import net.quanter.shield.mq.MQMessageVO;
import net.quanter.shield.mq.MQProducer;
import net.quanter.shield.mq.rocketmq.param.RocketMQBorkerParam;
import net.quanter.shield.mq.rocketmq.param.RocketMQTopicParam;

import java.util.Map;
import java.util.Properties;

@Slf4j
public class RocketMQTcpProducer<T> implements MQProducer<T, Message> {
    final RocketMQBorkerParam mqConnectVO;
    final RocketMQTopicParam topic;
    final Producer tcpProducer;

    public RocketMQTcpProducer(RocketMQBorkerParam mqConnectVO, RocketMQTopicParam topic) {
        this.mqConnectVO = mqConnectVO;
        this.topic = topic;
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.AccessKey, mqConnectVO.getAccessId());
        // AccessKeySecret 阿里云身份验证，在阿里云服务器管理控制台创建。
        properties.put(PropertyKeyConst.SecretKey, mqConnectVO.getAccessKey());
        //设置发送超时时间，单位毫秒。
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "2000");
        // 设置TCP接入域名，进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, mqConnectVO.getEndPoint());
        tcpProducer = ONSFactory.createProducer(properties);
        tcpProducer.start();
    }

    @Override
    public Message getSourceMessageFromMQMessage(MQMessageVO mqMessageVO) {
        Message message = new Message(
                mqMessageVO.getTopic(),
                mqMessageVO.getTag(),
                mqMessageVO.getBase64Obj()
        );
        message.setShardingKey(mqMessageVO.getShardKey());
        message.setMsgID(mqMessageVO.getMessageId());
        Map<String, String> properties = mqMessageVO.getProperties();
        for (Map.Entry<String, String> entry : properties.entrySet()) {
            message.putUserProperties(entry.getKey(), entry.getValue());
        }
        return message;
    }

    @Override
    public ResultDTO send(MQMessageVO mqMessageVO, Message message) {
        try {
            SendResult sendResult = tcpProducer.send(message);
            if (sendResult.getMessageId() != null) {
                mqMessageVO.setMessageId(sendResult.getMessageId());
            }
            mqMessageVO.setMessageId(sendResult.getMessageId());
            mqMessageVO.setRequestId(sendResult.getMessageId());
            return ResultDTO.SUCCESS;
        } catch (Throwable e) {
            log.error("RocketMQTcpProducer send error,mqMessageVO={}",
                    JSON.toJSON(mqMessageVO),
                    e);
            return ResultDTO.failure().message(e.getMessage());
        }
    }

    @Override
    public void close() {
        tcpProducer.shutdown();
    }
}
